Hadoop Streamingジョブの処理結果を圧縮する
今回はHadoop Streamingジョブの処理結果を圧縮する方法について紹介します。以前紹介した以下のエントリーに対してEMRで利用可能な圧縮方式を試しています。
結論
先に結論を書いておきます。EMRで利用可能な圧縮形式の一覧は以下でした。
- DEFLATE(zlib)
- gzip
- bzip2
- LZO
- LZ4
- Snappy
また、Hadoop Streamingジョブの処理結果を圧縮する際は以下の2つのオプションを利用します。
mapreduce.output.fileoutputformat.compress
mapreduce.output.fileoutputformat.compress.codec
mapreduce.output.fileoutputformat.compress
オプションにtrue
を指定すると圧縮されるようになります。mapreduce.output.fileoutputformat.compress.codec
オプションは圧縮形式に対応したクラス名を指定します。例えばgzipの場合はorg.apache.hadoop.io.compress.GzipCodec
を指定します。
実行環境
- emr-5.6.0 でアプリケーションは Hadoop のみ
- Hadoop 2.7.3
- ハードウェア構成は m1.medium を 1 台(検証用なのでマスターノードのみ)
- 東京リージョン
EMRクラスタの作成
まずはEMRクラスタを作成します。AWS CLIを利用する場合は以下のようなコマンドになります。SubnetId
, log-uri
を自身の環境に合わせて書き換えて下さい。
aws emr create-cluster --auto-scaling-role EMR_AutoScaling_DefaultRole --applications Name=Hadoop \ --ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-XXXX"}' \ --service-role EMR_DefaultRole --enable-debugging --release-label emr-5.6.0 \ --log-uri 's3n://aws-logs-XXXX-ap-northeast-1/elasticmapreduce/' --name 'compress' \ --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m1.medium","Name":"Master Instance Group"}]' \ --scale-down-behavior TERMINATE_AT_INSTANCE_HOUR --region ap-northeast-1
実際に実行すると作成したEMRクラスタのClusterId
が表示されます。ClusterId
はジョブを実行する際に利用するので控えておいて下さい。
$ aws emr create-cluster ... { "ClusterId": "j-XXXXXX" }
gzip形式
まずはgzip形式で出力してみましょう。AWS CLIの引数に指定するSONファイルを作成して下さい。ファイル名はcompress_gzip_step.json
とし、出力先であるs3://mybucket/output
を自身の環境に合わせて書き換えて下さい。
[ { "Name": "compress_gzip", "Type": "STREAMING", "ActionOnFailure": "CONTINUE", "Args": [ "-D stream.non.zero.exit.is.failure=false", "-D mapreduce.job.reduces=0", "-D stream.map.output=keyonlytext", "-D mapreduce.output.fileoutputformat.compress=true", "-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec", "-mapper", "grep hbase", "-input", "s3://ap-northeast-1.elasticmapreduce.samples/elb-access-logs/data", "-output", "s3://mybucket/output"] } ]
grepコマンドをHadoop Streaming上で実行する | Developers.IOと比べて以下の2つのオプションを指定しています。
-D mapreduce.output.fileoutputformat.compress=true
-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec
ではcompress_gzip_step.json
を利用してジョブを登録したいと思います。j-XXXXXXX
を起動したEMRクラスタのClusterIdに書き換えて以下のコマンドを実行して下さい。
aws emr add-steps --cluster-id j-XXXXXXX --steps file://./compress_gzip_step.json --region ap-northeast-1
m1.mediumでマスターノード1台のみ構成なので、10分ほどするとS3に処理結果が出力されると思います。入力ファイルごとに拡張子がgzとなりgzip圧縮されていることが分かります。
その他の圧縮形式
その他の圧縮形式を指定した際の結果を貼っておきます。クラス名はorg.apache.hadoop.io.compress
パッケージに含まれている *1Codecクラスを順番に試してみました。
DEFLATE(zlib)
-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DeflateCodec
を指定します。
bzip2
-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.BZip2Codec
を指定します。
LZO
-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.LzoCodec
を指定します。
LZ4
-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.Lz4Codec
を指定します。
Snappy
-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
を指定します。
EMRクラスタの削除
作成したEMRクラスタは忘れずに削除しましょう。AWS CLIで削除する場合は以下のコマンドで削除して下さい。
aws emr terminate-clusters --cluster-id j-XXXXXXX --region ap-northeast-1
削除できているかの確認は以下のコマンドを利用して下さい。"TERMINATED"
と出力されると削除済みという意味です。
aws emr describe-cluster --cluster-id j-XXXXXXX --query Cluster.Status.State --region ap-northeast-1
デフォルトの圧縮形式
mapreduce.output.fileoutputformat.compress.codec
オプションを指定しない場合はデフォルトの圧縮形式としてorg.apache.hadoop.io.compress.DefaultCodec
が利用されるようになっています。そして、DefaultCodecが適用された場合の圧縮形式はDEFLATE(zlib)となります。この辺りは歴史の話になるのですが、先にDefaultCodecが作成され、DEFLATEを明示的に指定できるようにDeflateCodecが作成された *2という経緯があったりします。そのため、DeflateCodec.javaの実装はDefaultCodecを継承しているだけ *3だったりしますw
最後に
EMRで利用可能な圧縮形式を知りたくて調査してみました。LZ4は知らなかったので勉強になりました。参考になれば幸いです。
参考
- クラスターの出力を圧縮する - Amazon EMR
- 圧縮ファイルの処理方法 - Amazon EMR
- Deep Dive - Amazon Elastic MapReduce (EMR)
- Hadoopチューニング - Meta Search